Channel 和 goroutine 的结合是 Go 并发编程的大杀器。而 Channel 的实际应用也经常让人眼前一亮,通过与 select,cancel,timer 等结合,它能实现各种各样的功能。接下来,我们就要梳理一下 channel 的应用。

停止信号

“如何优雅地关闭 channel”那一节已经讲得很多了,这块就略过了。

channel 用于停止信号的场景还是挺多的,经常是关闭某个 channel 或者向 channel 发送一个元素,使得接收 channel 的那一方获知道此信息,进而做一些其他的操作。

任务定时

与 timer 结合,一般有两种玩法:实现超时控制,实现定期执行某个任务。

有时候,需要执行某项操作,但又不想它耗费太长时间,上一个定时器就可以搞定:

  1. select {
  2. case <-time.After(100 * time.Millisecond):
  3. case <-s.stopc:
  4. return false
  5. }

等待 100 ms 后,如果 s.stopc 还没有读出数据或者被关闭,就直接结束。这是来自 etcd 源码里的一个例子,这样的写法随处可见。

定时执行某个任务,也比较简单:

  1. func worker() {
  2. ticker := time.Tick(1 * time.Second)
  3. for {
  4. select {
  5. case <- ticker:
  6. // 执行定时任务
  7. fmt.Println("执行 1s 定时任务")
  8. }
  9. }
  10. }

每隔 1 秒种,执行一次定时任务。

解耦生产方和消费方

服务启动时,启动 n 个 worker,作为工作协程池,这些协程工作在一个 for {} 无限循环里,从某个 channel 消费工作任务并执行:

  1. func main() {
  2. taskCh := make(chan int, 100)
  3. go worker(taskCh)
  4. // 塞任务
  5. for i := 0; i < 10; i++ {
  6. taskCh <- i
  7. }
  8. // 等待 1 小时
  9. select {
  10. case <-time.After(time.Hour):
  11. }
  12. }
  13. func worker(taskCh <-chan int) {
  14. const N = 5
  15. // 启动 5 个工作协程
  16. for i := 0; i < N; i++ {
  17. go func(id int) {
  18. for {
  19. task := <- taskCh
  20. fmt.Printf("finish task: %d by worker %d\n", task, id)
  21. time.Sleep(time.Second)
  22. }
  23. }(i)
  24. }
  25. }

5 个工作协程在不断地从工作队列里取任务,生产方只管往 channel 发送任务即可,解耦生产方和消费方。

程序输出:

  1. finish task: 1 by worker 4
  2. finish task: 2 by worker 2
  3. finish task: 4 by worker 3
  4. finish task: 3 by worker 1
  5. finish task: 0 by worker 0
  6. finish task: 6 by worker 0
  7. finish task: 8 by worker 3
  8. finish task: 9 by worker 1
  9. finish task: 7 by worker 4
  10. finish task: 5 by worker 2

控制并发数

有时需要定时执行几百个任务,例如每天定时按城市来执行一些离线计算的任务。但是并发数又不能太高,因为任务执行过程依赖第三方的一些资源,对请求的速率有限制。这时就可以通过 channel 来控制并发数。

下面的例子来自《Go 语言高级编程》:

  1. var limit = make(chan int, 3)
  2. func main() {
  3. // …………
  4. for _, w := range work {
  5. go func() {
  6. limit <- 1
  7. w()
  8. <-limit
  9. }()
  10. }
  11. // …………
  12. }

构建一个缓冲型的 channel,容量为 3。接着遍历任务列表,每个任务启动一个 goroutine 去完成。真正执行任务,访问第三方的动作在 w() 中完成,在执行 w() 之前,先要从 limit 中拿“许可证”,拿到许可证之后,才能执行 w(),并且在执行完任务,要将“许可证”归还。这样就可以控制同时运行的 goroutine 数。

这里,limit <- 1 放在 func 内部而不是外部,原因是:

如果在外层,就是控制系统 goroutine 的数量,可能会阻塞 for 循环,影响业务逻辑。

limit 其实和逻辑无关,只是性能调优,放在内层和外层的语义不太一样。

还有一点要注意的是,如果 w() 发生 panic,那“许可证”可能就还不回去了,因此需要使用 defer 来保证。

参考资料

【channel 应用】https://www.s0nnet.com/archives/go-channels-practice

【应用举例】https://zhuyasen.com/post/go_queue.html

【应用】https://tonybai.com/2014/09/29/a-channel-compendium-for-golang/

【Go 语言高级并发编程】https://chai2010.cn/advanced-go-programming-book/